Workflowsでワークフローから他のワークフローを並列実行してみる

Workflowsでワークフローから他のワークフローを並列実行してみる

Workflowsではワークフローから他のワークフローを起動することができます。この機能を用いることで規模の大きい処理でも分散して処理することが可能になります。
Clock Icon2024.10.07

Workflowsはご存知の通りGoogle Cloudのさまざまなリソースを呼び出してオーケストレーションするワークフローツールです。
そして、WorkflowsからWorkflowsのワークフローを呼び出すこともできます。
Workflowsから別のワークフローを呼び出すことで、親のワークフローから並列で子ワークフローに値を渡して、複数の子ワークフローで分散処理する、というようなワークフローを組むことができます。 
スクリーンショット 2024-10-07 17.25.23

※引用:https://cloud.google.com/workflows/docs/tutorials/execute-workflows-from-workflow?hl=ja

今回は公式リファレンスに並列実行で他のワークフローを呼び出すチュートリアルがあったので試してみました。
https://cloud.google.com/workflows/docs/tutorials/execute-workflows-from-workflow?hl=ja

やってみる

※サービスアカウント作成やWorkflows APIの有効化などは済んでいる前提です

ソースコードの準備

親と子のワークフローを作成します。
まずは子のワークフローからです。

yaml
main:
  params: [args]
  steps:
    - init:
        assign:
          - iteration : ${args.iteration}
    - wait:
        call: sys.sleep
        args:
            seconds: 10
    - check_iteration_even_or_odd:
        switch:
          - condition: ${iteration % 2 == 0}
            next: raise_error
    - return_message:
        return: ${"Hello world"+iteration}
    - raise_error:
        raise: ${"Error with iteration "+iteration}

ソースコードができたらworkflow-childの名前でデプロイします。親のワークフローと同じリージョンです。
簡単に解説します。

  1. init

    • 目的: argsからiterationという変数を初期化
    • 動作: iterationargs.iterationの値を代入
  2. wait

    • 目的: 指定された秒数だけ待機
    • 動作: sys.sleepを呼び出して、10秒間の待機を実行
  3. check_iteration_even_or_odd

    • 目的: iterationが偶数か奇数かを判定
    • 動作:
      • switch文を使用して、iterationが偶数の場合にraise_errorステップに進みます。
      • 奇数の場合はこのステップをスキップし、return_messageに進みます。
  4. return_message

    • 目的: iterationが奇数の場合、メッセージを返却
    • 動作: "Hello world"iterationの値を連結して返します。
    • : iterationが3の場合、"Hello world3"を返却
  5. raise_error

    • 目的: iterationが偶数の場合、エラーを発生
    • 動作: "Error with iteration "iterationの値を連結してエラーを発生させます。
    • : iterationが2の場合、"Error with iteration 2"というエラーを発生

このワークフローは、指定されたiterationの値に基づいて動作が異なります。10秒の待機の後、iterationが偶数ならエラーを発生させ、奇数ならメッセージを返却します。
呼び出し元の親のワークフローでは奇数と偶数の数字を渡すので、実行すると2種類の挙動を観察することができます。

それでは続けて親のワークフローを実装します。

yaml
main:
  steps:
    - init:
        assign:
          - execution_results: {} # Initialize a dictionary to store execution results
          - execution_results.success: {} # Dictionary for successful executions
          - execution_results.failure: {} # Dictionary for failed executions
    - execute_child_workflows:
        parallel:
          shared: [execution_results]
          for:
            value: iteration
            in: [1, 2, 3, 4]
            steps:
              - iterate:
                  try:
                    steps:
                      - execute_child_workflow:
                          call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run
                          args:
                            workflow_id: "workflow-child"
                            # Specify location and project_id if necessary
                            # location: "your-location"
                            # project_id: "your-project-id"
                            argument:
                              iteration: ${iteration}
                          result: execution_result
                      - save_successful_execution:
                          assign:
                            - execution_results.success[string(iteration)]: ${execution_result}
                  except:
                    as: e
                    steps:
                      - save_failed_execution:
                          assign:
                            - execution_results.failure[string(iteration)]: ${e}
    - logStep:
        call: sys.log
        args:
            text: ${execution_results}
            severity: DEBUG
    - return_execution_results:
        return: ${execution_results}

ほぼリファレンスのままなのですが、途中のlogStepだけ追加しています。

    - logStep:
        call: sys.log
        args:
            text: ${execution_results}
            severity: DEBUG

ログ出力して詳細を見たかったからです。
適当な名前でデプロイしてください。リージョンは子ワークフローと同じです。

こちらも解説します。

  1. init

    • 目的: execution_resultsという辞書を初期化し、成功と失敗の結果を格納するためのサブ辞書を作成
    • 動作:
      • execution_resultsは、全体の実行結果を格納するための変数です
      • execution_results.successは、成功した実行の結果を格納します
      • execution_results.failureは、失敗した実行の結果を格納します
  2. execute_child_workflows

    • 目的: 複数の子ワークフローを並列で実行し、それぞれの結果を変数に格納
    • 動作:
      • parallelブロック内で、execution_resultsを共有しながら、イテレーションごとに子ワークフローを実行します。子ワークフローの実行はコネクタgoogleapis.workflowexecutions.v1.projects.locations.workflows.executions.runを用います
      • forループで、1から4までのイテレーションを実行します
      • tryブロックで、子ワークフローを実行します
      • 成功した場合は、execution_results.successに結果を保存
      • 失敗した場合は、exceptブロックでエラーをキャッチし、execution_results.failureにエラー情報を保存
  3. logStep

    • 目的: 実行結果をログに記録
    • 動作:
      • sys.logを呼び出し、execution_results全体をログ出力
  4. return_execution_results

    • 目的: 最終的な実行結果を返却
    • 動作:
      • execution_resultsを返します。これには、すべての子ワークフロー実行結果が含まれています

このワークフローは、指定された範囲(1-4まで)のイテレーションに対して子ワークフローを並列に実行し、その結果を変数に格納します。各実行の成功または失敗に応じて結果を保存し、最後にすべての結果をログします。
この親ワークフローを実行すると、実際に子ワークフローが実行された場合の挙動を観察することができます。

実際に動かしてみる

親のワークフローを実行してみてください。
実行後にreturn_execution_results 成功と表示されていたら成功しています。
スクリーンショット 2024-10-07 22.12.56

ログを見てみましょう。
以下のようなJSONが出力されていると思います(以下は整形しています)

{
  "failure": {
    "2": {
      "message": "Execution failed or cancelled.",
      "operation": {
        "argument": "{\"iteration\":2}",
        "createTime": "2024-10-07T12:58:04.704360244Z",
        "disableConcurrencyQuotaOverflowBuffering": true,
        "duration": "10.154318247s",
        "endTime": "2024-10-07T12:58:14.858678491Z",
        "error": {
          "context": "RuntimeError: \"Error with iteration 2\"\nin step \"raise_error\", routine \"main\", line: 18",
          "payload": "\"Error with iteration 2\"",
          "stackTrace": {
            "elements": [
              {
                "position": {
                  "column": "9",
                  "length": "5",
                  "line": "18"
                },
                "routine": "main",
                "step": "raise_error"
              }
            ]
          }
        },
        "name": "projects/**********/locations/asia-northeast1/workflows/workflow-child/executions/************************",
        "startTime": "2024-10-07T12:58:04.704360244Z",
        "state": "FAILED",
        "status": {
          "currentSteps": [
            {
              "routine": "main",
              "step": "raise_error"
            }
          ]
        },
        "workflowRevisionId": "000001-276"
      },
      "tags": ["OperationError"]
    },
    "4": {
      "message": "Execution failed or cancelled.",
      "operation": {
        "argument": "{\"iteration\":4}",
        "createTime": "2024-10-07T12:58:04.575646768Z",
        "disableConcurrencyQuotaOverflowBuffering": true,
        "duration": "10.135465986s",
        "endTime": "2024-10-07T12:58:14.711112754Z",
        "error": {
          "context": "RuntimeError: \"Error with iteration 4\"\nin step \"raise_error\", routine \"main\", line: 18",
          "payload": "\"Error with iteration 4\"",
          "stackTrace": {
            "elements": [
              {
                "position": {
                  "column": "9",
                  "length": "5",
                  "line": "18"
                },
                "routine": "main",
                "step": "raise_error"
              }
            ]
          }
        },
        "name": "projects/**********/locations/asia-northeast1/workflows/workflow-child/executions/************************",
        "startTime": "2024-10-07T12:58:04.575646768Z",
        "state": "FAILED",
        "status": {
          "currentSteps": [
            {
              "routine": "main",
              "step": "raise_error"
            }
          ]
        },
        "workflowRevisionId": "000001-276"
      },
      "tags": ["OperationError"]
    }
  },
  "success": {
    "1": "Hello world1",
    "3": "Hello world3"
  }
}

13のワークフローが成功していて、24のワークフローが失敗していることが上記よりわかります。
子ワークフローの実行履歴を見てみます。
スクリーンショット 2024-10-07 22.17.53

2つの成功と2つの失敗が記録されていますね。作成日時も同一で並列実行されている様子がわかります。

以上の結果より、親のワークフローから子のワークフローを並列実行できることがわかりました。

まとめ

ワークフローからワークフローを呼び出すことができることがわかりました。
この機能を用いると、複数の子ワークフローを並列で起動して、さらにその子ワークフローで並列で
処理をする、ということも可能です。
親は処理対象を子に渡すだけ、子は処理対象をじっくりと処理する、というように役割分断ができるようになります。そうすることで実装もシンプルになり保守性や可読性も向上するかもしれません。
複雑な処理を行う場合、単一のワークフローで処理するよりもこのようにワークフローを
分割することも視野に入れても良いのかもしれません。

今回は基本的なワークフローからワークフローの呼び出しを試してみました。
次のブログではもう少し深掘りしたいと思います。
それではまた。

参考

https://cloud.google.com/workflows/docs/tutorials/execute-workflows-from-workflow?hl=ja

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.